package com.atguigu.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.KafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

//数据流：web/app -> 日志服务器(log文件) -> flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD)
//程  序：Mock -> 文件 -> f1.sh -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK)
public class BaseLogApp {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 需要从Checkpoint或者Savepoint启动程序
        //2.1 开启Checkpoint,每隔5秒钟做一次CK  ,并指定CK的一致性语义
        //env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        // 2.2 设置超时时间为 1 分钟
        //env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        // 2.3 设置两次重启的最小时间间隔
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        // 2.5 指定从 CK 自动重启策略
        //env.setRestartStrategy(RestartStrategies.failureRateRestart(
        //        3, Time.days(1L), Time.minutes(1L)
        //));
        // 2.6 设置状态后端
         //env.setStateBackend(new EmbeddedRocksDBStateBackend(true) );
        //env.getCheckpointConfig().setCheckpointStorage(
        //      "hdfs://hadoop102:8020/flinkCDC"
        //);
        // 2.7 设置访问HDFS的用户名
        //System.setProperty("HADOOP_USER_NAME", "atguigu");

        //TODO 2.读取Kafka topic_log主题数据创建流
        String sourceTopic = "topic_log";
        String groupId = "base_log_app_220718";
        DataStreamSource<String> kafkaDS = env.addSource(KafkaUtil.getFlinkKafkaConsumer(sourceTopic, groupId));

        //TODO 3.将数据转换为JSON对象,并过滤掉非JSON数据
        OutputTag<String> dirtyTag = new OutputTag<String>("Dirty") {
        };
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    ctx.output(dirtyTag, value);
                }
            }
        });
        //提取侧输出流数据并打印
        jsonObjDS.getSideOutput(dirtyTag).print("Dirty>>>>>>>>>");

        //TODO 4.按照mid分组
        KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"));

        //TODO 5.将数据进行新老用户标记修复
        SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlagDS = keyedStream.map(new RichMapFunction<JSONObject, JSONObject>() {

            private ValueState<String> lastVisitDtState;

            @Override
            public void open(Configuration parameters) throws Exception {
             //   StateTtlConfig stateTtlConfig = new StateTtlConfig.Builder(Time.days(1)).build();
                 ValueStateDescriptor<String> stringValueStateDescriptor = new ValueStateDescriptor<>("last-visit-state", String.class);
              //  stringValueStateDescriptor .enableTimeToLive(stateTtlConfig);
                lastVisitDtState = getRuntimeContext().getState(stringValueStateDescriptor);
            }

            @Override
            public JSONObject map(JSONObject value) throws Exception {

                //提取状态数据以及当前数据中的is_new标记,提取数据中的时间戳生成日期
                String lastVisitDt = lastVisitDtState.value();
                String isNew = value.getJSONObject("common").getString("is_new");
                Long ts = value.getLong("ts");
                String curDt = DateFormatUtil.toDate(ts);

                if ("1".equals(isNew)) {

                    if (lastVisitDt == null) {
                        //说明该数据为第一天的第一条数据,数据保持不变,状态更新为当日
                        lastVisitDtState.update(curDt);
                    } else if (!lastVisitDt.equals(curDt)) {
                        //说明该数据不是第一天数据,且中间有卸载,那么需要将标记改为0
                        value.getJSONObject("common").put("is_new", "0");
                    }

                } else if (lastVisitDt == null) {
                    //说明该数据在任务启动之前就有访问的数据，所以将状态更新为1970-01-01
                    lastVisitDtState.update("1970-01-01");
                }

                return value;
            }
        });

        //TODO 6.使用侧输出流对数据进行分流处理 一个主流,四个侧输出流  页面日志放到主流,其他日志放到侧输出流
        OutputTag<String> startTag = new OutputTag<String>("start") {
        };
        OutputTag<String> displayTag = new OutputTag<String>("display") {
        };
        OutputTag<String> actionTag = new OutputTag<String>("action") {
        };
        OutputTag<String> errorTag = new OutputTag<String>("error") {
        };
        SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() {
            @Override
            public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {

                //尝试取出错误字段
                String err = value.getString("err");
                if (err != null && !err.equals("")) {
                    //该数据中包含错误信息
                    ctx.output(errorTag, value.toJSONString());
                    value.remove("err");
                }

                //尝试获取启动数据
                String start = value.getString("start");
                if (start != null && !"".equals(start)) {
                    //该数据为启动日志
                    ctx.output(startTag, value.toJSONString());
                } else {
                    //该数据为页面日志

                    //提取页面ID与时间戳
                    String pageId = value.getJSONObject("page").getString("page_id");
                    String common = value.getString("common");
                    Long ts = value.getLong("ts");

                    //尝试获取曝光数据
                    JSONArray displays = value.getJSONArray("displays");
                    if (displays != null && displays.size() > 0) {
                        for (int i = 0; i < displays.size(); i++) {
                            JSONObject display = displays.getJSONObject(i);
                            display.put("page_id", pageId);
                            display.put("ts", ts);
                            display.put("common", common);
                            //输出数据
                            ctx.output(displayTag, display.toJSONString());
                        }
                    }

                    //尝试获取动作数据
                    JSONArray actions = value.getJSONArray("actions");
                    if (actions != null && actions.size() > 0) {
                        for (int i = 0; i < actions.size(); i++) {
                            JSONObject action = actions.getJSONObject(i);
                            action.put("page_id", pageId);
                            action.put("common", common);
                            //输出数据
                            ctx.output(actionTag, action.toJSONString());
                        }
                    }

                    //页面日志
                    value.remove("displays");
                    value.remove("actions");
                    out.collect(value.toJSONString());

                }
            }
        });

        //TODO 7.提取侧输出流并将数据写出到对应的Kafka主题
        DataStream<String> startDS = pageDS.getSideOutput(startTag);
        DataStream<String> displayDS = pageDS.getSideOutput(displayTag);
        DataStream<String> actionDS = pageDS.getSideOutput(actionTag);
        DataStream<String> errorDS = pageDS.getSideOutput(errorTag);

        pageDS.print("pageDS>>>>>");
        startDS.print("startDS>>>>");
        displayDS.print("displayDS>>>>>>");
        actionDS.print("actionDS>>>>>>>");
        errorDS.print("errorDS>>>>>");

        String page_topic = "dwd_traffic_page_log";
        String start_topic = "dwd_traffic_start_log";
        String display_topic = "dwd_traffic_display_log";
        String action_topic = "dwd_traffic_action_log";
        String error_topic = "dwd_traffic_error_log";

        pageDS.addSink(KafkaUtil.getFlinkKafkaProducer(page_topic));
        startDS.addSink(KafkaUtil.getFlinkKafkaProducer(start_topic));
        displayDS.addSink(KafkaUtil.getFlinkKafkaProducer(display_topic));
        actionDS.addSink(KafkaUtil.getFlinkKafkaProducer(action_topic));
        errorDS.addSink(KafkaUtil.getFlinkKafkaProducer(error_topic));

        //TODO 8.启动任务
        env.execute("BaseLogApp");

    }

}
